Workflowsでワークフローから他のワークフローを並列実行してみる
Workflowsはご存知の通りGoogle Cloudのさまざまなリソースを呼び出してオーケストレーションするワークフローツールです。
そして、WorkflowsからWorkflowsのワークフローを呼び出すこともできます。
Workflowsから別のワークフローを呼び出すことで、親のワークフローから並列で子ワークフローに値を渡して、複数の子ワークフローで分散処理する、というようなワークフローを組むことができます。
※引用:https://cloud.google.com/workflows/docs/tutorials/execute-workflows-from-workflow?hl=ja
今回は公式リファレンスに並列実行で他のワークフローを呼び出すチュートリアルがあったので試してみました。
やってみる
※サービスアカウント作成やWorkflows APIの有効化などは済んでいる前提です
ソースコードの準備
親と子のワークフローを作成します。
まずは子のワークフローからです。
main:
params: [args]
steps:
- init:
assign:
- iteration : ${args.iteration}
- wait:
call: sys.sleep
args:
seconds: 10
- check_iteration_even_or_odd:
switch:
- condition: ${iteration % 2 == 0}
next: raise_error
- return_message:
return: ${"Hello world"+iteration}
- raise_error:
raise: ${"Error with iteration "+iteration}
ソースコードができたらworkflow-child
の名前でデプロイします。親のワークフローと同じリージョンです。
簡単に解説します。
-
init
- 目的:
args
からiteration
という変数を初期化 - 動作:
iteration
にargs.iteration
の値を代入
- 目的:
-
wait
- 目的: 指定された秒数だけ待機
- 動作:
sys.sleep
を呼び出して、10秒間の待機を実行
-
check_iteration_even_or_odd
- 目的:
iteration
が偶数か奇数かを判定 - 動作:
switch
文を使用して、iteration
が偶数の場合にraise_error
ステップに進みます。- 奇数の場合はこのステップをスキップし、
return_message
に進みます。
- 目的:
-
return_message
- 目的:
iteration
が奇数の場合、メッセージを返却 - 動作:
"Hello world"
にiteration
の値を連結して返します。 - 例:
iteration
が3の場合、"Hello world3"
を返却
- 目的:
-
raise_error
- 目的:
iteration
が偶数の場合、エラーを発生 - 動作:
"Error with iteration "
にiteration
の値を連結してエラーを発生させます。 - 例:
iteration
が2の場合、"Error with iteration 2"
というエラーを発生
- 目的:
このワークフローは、指定されたiteration
の値に基づいて動作が異なります。10秒の待機の後、iteration
が偶数ならエラーを発生させ、奇数ならメッセージを返却します。
呼び出し元の親のワークフローでは奇数と偶数の数字を渡すので、実行すると2種類の挙動を観察することができます。
それでは続けて親のワークフローを実装します。
main:
steps:
- init:
assign:
- execution_results: {} # Initialize a dictionary to store execution results
- execution_results.success: {} # Dictionary for successful executions
- execution_results.failure: {} # Dictionary for failed executions
- execute_child_workflows:
parallel:
shared: [execution_results]
for:
value: iteration
in: [1, 2, 3, 4]
steps:
- iterate:
try:
steps:
- execute_child_workflow:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: "workflow-child"
# Specify location and project_id if necessary
# location: "your-location"
# project_id: "your-project-id"
argument:
iteration: ${iteration}
result: execution_result
- save_successful_execution:
assign:
- execution_results.success[string(iteration)]: ${execution_result}
except:
as: e
steps:
- save_failed_execution:
assign:
- execution_results.failure[string(iteration)]: ${e}
- logStep:
call: sys.log
args:
text: ${execution_results}
severity: DEBUG
- return_execution_results:
return: ${execution_results}
ほぼリファレンスのままなのですが、途中のlogStep
だけ追加しています。
- logStep:
call: sys.log
args:
text: ${execution_results}
severity: DEBUG
ログ出力して詳細を見たかったからです。
適当な名前でデプロイしてください。リージョンは子ワークフローと同じです。
こちらも解説します。
-
init
- 目的:
execution_results
という辞書を初期化し、成功と失敗の結果を格納するためのサブ辞書を作成 - 動作:
execution_results
は、全体の実行結果を格納するための変数ですexecution_results.success
は、成功した実行の結果を格納しますexecution_results.failure
は、失敗した実行の結果を格納します
- 目的:
-
execute_child_workflows
- 目的: 複数の子ワークフローを並列で実行し、それぞれの結果を変数に格納
- 動作:
parallel
ブロック内で、execution_results
を共有しながら、イテレーションごとに子ワークフローを実行します。子ワークフローの実行はコネクタgoogleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
を用いますfor
ループで、1から4までのイテレーションを実行しますtry
ブロックで、子ワークフローを実行します- 成功した場合は、
execution_results.success
に結果を保存 - 失敗した場合は、
except
ブロックでエラーをキャッチし、execution_results.failure
にエラー情報を保存
-
logStep
- 目的: 実行結果をログに記録
- 動作:
sys.log
を呼び出し、execution_results
全体をログ出力
-
return_execution_results
- 目的: 最終的な実行結果を返却
- 動作:
execution_results
を返します。これには、すべての子ワークフロー実行結果が含まれています
このワークフローは、指定された範囲(1-4まで)のイテレーションに対して子ワークフローを並列に実行し、その結果を変数に格納します。各実行の成功または失敗に応じて結果を保存し、最後にすべての結果をログします。
この親ワークフローを実行すると、実際に子ワークフローが実行された場合の挙動を観察することができます。
実際に動かしてみる
親のワークフローを実行してみてください。
実行後にreturn_execution_results 成功
と表示されていたら成功しています。
ログを見てみましょう。
以下のようなJSONが出力されていると思います(以下は整形しています)
{
"failure": {
"2": {
"message": "Execution failed or cancelled.",
"operation": {
"argument": "{\"iteration\":2}",
"createTime": "2024-10-07T12:58:04.704360244Z",
"disableConcurrencyQuotaOverflowBuffering": true,
"duration": "10.154318247s",
"endTime": "2024-10-07T12:58:14.858678491Z",
"error": {
"context": "RuntimeError: \"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18",
"payload": "\"Error with iteration 2\"",
"stackTrace": {
"elements": [
{
"position": {
"column": "9",
"length": "5",
"line": "18"
},
"routine": "main",
"step": "raise_error"
}
]
}
},
"name": "projects/**********/locations/asia-northeast1/workflows/workflow-child/executions/************************",
"startTime": "2024-10-07T12:58:04.704360244Z",
"state": "FAILED",
"status": {
"currentSteps": [
{
"routine": "main",
"step": "raise_error"
}
]
},
"workflowRevisionId": "000001-276"
},
"tags": ["OperationError"]
},
"4": {
"message": "Execution failed or cancelled.",
"operation": {
"argument": "{\"iteration\":4}",
"createTime": "2024-10-07T12:58:04.575646768Z",
"disableConcurrencyQuotaOverflowBuffering": true,
"duration": "10.135465986s",
"endTime": "2024-10-07T12:58:14.711112754Z",
"error": {
"context": "RuntimeError: \"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18",
"payload": "\"Error with iteration 4\"",
"stackTrace": {
"elements": [
{
"position": {
"column": "9",
"length": "5",
"line": "18"
},
"routine": "main",
"step": "raise_error"
}
]
}
},
"name": "projects/**********/locations/asia-northeast1/workflows/workflow-child/executions/************************",
"startTime": "2024-10-07T12:58:04.575646768Z",
"state": "FAILED",
"status": {
"currentSteps": [
{
"routine": "main",
"step": "raise_error"
}
]
},
"workflowRevisionId": "000001-276"
},
"tags": ["OperationError"]
}
},
"success": {
"1": "Hello world1",
"3": "Hello world3"
}
}
1
と3
のワークフローが成功していて、2
と4
のワークフローが失敗していることが上記よりわかります。
子ワークフローの実行履歴を見てみます。
2つの成功と2つの失敗が記録されていますね。作成日時も同一で並列実行されている様子がわかります。
以上の結果より、親のワークフローから子のワークフローを並列実行できることがわかりました。
まとめ
ワークフローからワークフローを呼び出すことができることがわかりました。
この機能を用いると、複数の子ワークフローを並列で起動して、さらにその子ワークフローで並列で
処理をする、ということも可能です。
親は処理対象を子に渡すだけ、子は処理対象をじっくりと処理する、というように役割分断ができるようになります。そうすることで実装もシンプルになり保守性や可読性も向上するかもしれません。
複雑な処理を行う場合、単一のワークフローで処理するよりもこのようにワークフローを
分割することも視野に入れても良いのかもしれません。
今回は基本的なワークフローからワークフローの呼び出しを試してみました。
次のブログではもう少し深掘りしたいと思います。
それではまた。
参考